Solutions/Trend Micro Vision One/Data Connectors/AzureFunctionTrendMicroXDR/shared_code/configurations.py (117 lines of code) (raw):

''' Do not import customized_logger or it would have circular import error ''' import os from shared_code.models.oat import RiskLevel VERSION = '1.2.4' SIEM_NAME = 'SentinelAddon' XDR_HOSTS = { 'us': 'https://api.xdr.trendmicro.com', 'eu': 'https://api.eu.xdr.trendmicro.com', 'in': 'https://api.in.xdr.trendmicro.com', 'jp': 'https://api.xdr.trendmicro.co.jp', 'sg': 'https://api.sg.xdr.trendmicro.com', 'au': 'https://api.au.xdr.trendmicro.com', 'uae': 'https://api.uae.xdr.trendmicro.com/', 'mea': 'https://api.mea.xdr.trendmicro.com', } def get_workspace_id(): return os.environ['workspaceId'] def get_workspace_key(): return os.environ['workspaceKey'] def get_api_tokens(): is_key_vault_enabled = ( os.getenv('keyVaultUrl') and os.getenv('keyVaultIdentityClientId') and os.getenv('clpIds') ) if is_key_vault_enabled: # get tokens from key vault from azure.identity import DefaultAzureCredential from azure.keyvault.secrets import SecretClient clp_ids = set(filter(None, os.getenv('clpIds').split(','))) credential = DefaultAzureCredential( managed_identity_client_id=os.getenv('keyVaultIdentityClientId') ) client = SecretClient(vault_url=os.getenv('keyVaultUrl'), credential=credential) tokens = [] for clp_id in clp_ids: try: token = client.get_secret(get_secret_name(clp_id)).value tokens.append(token) except Exception as e: print(e) return tokens else: return set(filter(None, os.environ.get('apiTokens', '').split(','))) def get_xdr_host_url(): xdr_host_url = os.environ.get('xdrHostUrl') return xdr_host_url or XDR_HOSTS[os.environ['regionCode']] def get_storage_connection_string(): return os.environ['AzureWebJobsStorage'] def get_workbench_api_timeout_seconds(): return int(os.environ.get('workbenchApiTimeoutSeconds', 70)) def get_max_workbench_query_minutes(): return int(os.environ.get('maxWorkbenchQueryMinutes', 60)) def get_default_workbench_query_minutes(): return int(os.environ.get('defaultWorkbenchQueryMinutes', 5)) def get_max_oat_query_minutes(): return int(os.environ.get('maxOatQueryMinutes', 60)) def get_default_oat_query_minutes(): return int(os.environ.get('defaultOatQueryMinutes', 5)) def get_oat_query_time_buffer_minutes(): return int(os.environ.get('defaultOatQueryTimeBufferMinutes', 5)) def get_max_oat_data_retention_day(): return int(os.environ.get('maxOatDataRetention', 7)) def get_oat_rows_bulk_count(): return int(os.environ.get('oatRowsBulkCount', 1000)) def get_datetime_format(): return '%Y-%m-%dT%H:%M:%S.000Z' def get_wb_list_v3_datetime_format(): return "%Y-%m-%dT%H:%M:%SZ" def get_oat_pipeline_datetime_format(): return '%Y-%m-%dT%H:%M:%SZ' def get_wb_log_type(): return 'TrendMicro_XDR_WORKBENCH' def get_health_check_log_type(): return 'TrendMicro_XDR_Health_Check' def get_oat_health_check_log_type(): return 'TrendMicro_XDR_OAT_Health_Check' def get_rca_log_type(): return 'TrendMicro_XDR_RCA_Result' def get_rca_task_log_type(): return 'TrendMicro_XDR_RCA_Task' def get_oat_log_type(): return 'TrendMicro_XDR_OAT' def get_user_agent(): return f'TMXDR{SIEM_NAME}/{VERSION}' def get_secret_name(clp_id): return f'tmv1-entity-{clp_id}' def get_oat_config(): return { 'riskLevels': [ RiskLevel.LOW, RiskLevel.MEDIUM, RiskLevel.HIGH, RiskLevel.CRITICAL, ], 'hasDetail': True, 'description': get_user_agent(), } def get_oat_pipeline_task_queue_name(): return 'oat-pipeline-task-queue' def get_wb_list_queue_name(): return 'workbench-list-queue' def get_wb_detail_queue_name(): return 'workbench-queue' def get_execution_time(): return {'primary': [5, 15, 25, 35, 45, 55], 'secondary': [0, 10, 20, 30, 40, 50]} def get_max_proactive_retry_minutes(): return int(os.environ.get('maxProactiveRetryMinutes', 60)) def get_proactive_retry_time_interval_minutes(): return int(os.environ.get('proactiveRetryTimeIntervalMinutes', 8)) def get_retry_time_interval_minutes(): return int(os.environ.get('retryTimeIntervalMinutes', 30)) def get_query_aggressive_workbench(): return bool(os.environ.get("queryAggressiveWorkbench", False)) def get_query_custom_workbench(): return bool(os.environ.get("queryCustomWorkbench", False))